跟 Airflow 一樣,這種框架都不太好測,而 Flink 的流式處理以及他的複雜機制更是如此。
不過,我們還是可以從一些簡單的部份做起。
如果你用內建的 Source 跟 sink,我不建議你特別測試它們。
反之,如果你有自己客製化 source 如上次講的 JDBC Streaming source 的話,倒是可以用一般的測試手法處理,這裡我就不多談了。
但是,我們在設計程式的時候,應該要將 source 跟 sink 設計成可注入的方式。這樣後續在做整合測試的時候會方便許多。
我是建議不要都寫在 main() 裡面,而是將 source 跟 sink 在 main() 裡面初始化後,建立一個 job 的實例並做為它的 parameters。
這個是很基本卻也很重要的部份。大多數狀況,Flink 內建的 operator 不見得能符合我們使用。像是前面範例用的 flatMap,裡面將一行文字拆解後再輸出。
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// 依空格拆分,生成 (key, 1)
String[] words = value.split(" ");
for (String word : words) {
out.collect(new Tuple2<>(word, 1));
}
}
}
像這種元件,基本上會希望全部都有做到測試覆蓋。邏輯通常並不難測,因為 operator 可以的話,希望是獨立、簡單為主。太複雜的機制可能會需要用額外的狀態來保護。
以下是一個簡單的例子
public class TokenizerTest {
private Tokenizer tokenizer;
@Before
public void setUp() {
tokenizer = new Tokenizer();
}
@Test
public void testTokenizer() {
// 創建模擬的 Collector
Collector<Tuple2<String, Integer>> collector = Mockito.mock(Collector.class);
// 調用 flatMap 方法,這裡可以設定模擬的行為
tokenizer.flatMap("Hello World", collector);
// 在這裡添加斷言來檢查模擬的行為是否符合預期
verify(collector, times(2)).collect(Mockito.any(Tuple2.class));
// 如果有更具體的斷言,可以使用 ArgumentCaptor 來捕獲模擬方法的參數
ArgumentCaptor<Tuple2<String, Integer>> argumentCaptor = ArgumentCaptor.forClass(Tuple2.class);
verify(collector, times(2)).collect(argumentCaptor.capture());
// 現在你可以檢查捕獲的參數是否符合預期
// 例如,你可以檢查第一次 collect 的輸出
Tuple2<String, Integer> firstCapture = argumentCaptor.getAllValues().get(0);
assertEquals("Hello", firstCapture.f0);
assertEquals(Integer.valueOf(1), firstCapture.f1);
}
}